-
Notifications
You must be signed in to change notification settings - Fork 28.5k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[SPARK-38094] Enable matching schema column names by field ids #35385
Conversation
Can one of the admins verify this patch? |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks for opening a PR. I left a few comments and would appreciate it if you could address them.
val PARQUET_FIELD_ID_ENABLED = | ||
buildConf("spark.sql.parquet.fieldId.enabled") | ||
.doc("Field ID is a native field of the Parquet schema spec. When enabled, Parquet readers" + | ||
" will use field IDs (if present) in the requested Spark schema to look up Parquet" + |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
How does it work when there is a mixture of columns that have field id set and ones that don't?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It would try to match by id if id exists, otherwise, it would fall back to match by name.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
My understanding is that the code would use field ids if the flag is enabled, if the flag is disabled, the code would use names instead. My main concern is ambiguity resolution in schema.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ah, I meant even when this flag is enabled, my statement above still applies: the matching is a best-effort basis.
Disabling this flag will complete avoid reading and writing field ids.
...re/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFileFormat.scala
Outdated
Show resolved
Hide resolved
...e/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetReadSupport.scala
Outdated
Show resolved
Hide resolved
...e/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetReadSupport.scala
Outdated
Show resolved
Hide resolved
...rc/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFieldIdIOSuite.scala
Outdated
Show resolved
Hide resolved
...rc/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFieldIdIOSuite.scala
Outdated
Show resolved
Hide resolved
...rc/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFieldIdIOSuite.scala
Outdated
Show resolved
Hide resolved
...rc/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFieldIdIOSuite.scala
Outdated
Show resolved
Hide resolved
...rc/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFieldIdIOSuite.scala
Show resolved
Hide resolved
Just for my own knowledge: what needs to be done to make parquet-mr support field id? |
sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala
Outdated
Show resolved
Hide resolved
sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala
Outdated
Show resolved
Hide resolved
I am still investigating, previously I thought it requires support from parquet-mr, but now looks like it's not necessary. I am working on a fix locally, which might be pushed out as part of this PR or another. |
@jackierwzhang
I probably need to change the format specification to make the field id unique. |
Got it. As for duplicated field id, I think in my approach, reading parquet files with duplicated id across different groups are allowed, essentially we just don't want confusion when matching fields which are on the same level in the schema. Btw just curious, since you have been working on field id resolution for parquet-mr, do you know whether it currently supports reading and writing field ids yet? |
Sounds reasonable. I hope I can do the same too, but seems to me that I need to resolve the column by id only, which requires that the id to be unique in the entire schema. This is going to be a breaking change. Not sure if I am allowed to do it or not. It doesn't seem to me that parquet-mr supports reading and writing field ids yet. The field ids are not in |
@huaxingao Are you suggesting the code that:
aren't really doing anything? |
@jackierwzhang |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Looks good. I left a few comments and would appreciate it if you could take a look. Thanks!
...rc/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFieldIdIOSuite.scala
Show resolved
Hide resolved
...est/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFieldIdSchemaSuite.scala
Show resolved
Hide resolved
...e/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetReadSupport.scala
Outdated
Show resolved
Hide resolved
Got it. I was asking because I tested locally and found that parquet-mr can actually save and read field ids via Spark, so I don't have to patch anything for the parquet-mr repo. Tho there are a couple of small problems remaining for id matching on the parquet-mr side, I believe It's possible to extend this PR (or open another) to enable spark to match by id in that code path; I'm gonna do that soon. |
...e/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetReadSupport.scala
Outdated
Show resolved
Hide resolved
...e/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetReadSupport.scala
Show resolved
Hide resolved
.../src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetRowConverter.scala
Outdated
Show resolved
Hide resolved
.../src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetRowConverter.scala
Show resolved
Hide resolved
...c/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetSchemaConverter.scala
Outdated
Show resolved
Hide resolved
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Looks good. I left a few minor comments, I would appreciate it if you could follow up. Thank you!
sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala
Outdated
Show resolved
Hide resolved
...e/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetReadSupport.scala
Outdated
Show resolved
Hide resolved
...e/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetReadSupport.scala
Show resolved
Hide resolved
...e/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetReadSupport.scala
Show resolved
Hide resolved
...e/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetReadSupport.scala
Outdated
Show resolved
Hide resolved
SQLConf.SHUFFLE_PARTITIONS.key -> "5") | ||
SQLConf.SHUFFLE_PARTITIONS.key -> "5", | ||
// Enable parquet read field id for tests to ensure correctness | ||
SQLConf.PARQUET_FIELD_ID_READ_ENABLED.key -> "true" |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Does this mean that we will not test match by name and will always test by field id?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Not really. Again, enabling this flag would only try to match field ids if they exist, but disabling this flag will completely ignore matching using field id. so if I read with a spark schema that has no ids at all, and turn on this flag, it would be exactly the same as name matching.
I wanted to enable this flag for all tests to detect any regressions in existing test cases, in case when this flag is turned on by default in the future.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
But they would exist once we start writing field ids for all of the fields, would not they?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yeah, but it requires the original schema to contain parquet.field.id
metadata, which is not present in any of the existing suites, so it should behavior exactly like name matching.
Turning this on actually ensures that we didn't introduce any regression for existing code under this mixed matching mode, and detects if this metadata field has been used anywhere.
...core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/parquet/ParquetWrite.scala
Outdated
Show resolved
Hide resolved
sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetUtils.scala
Show resolved
Hide resolved
sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetUtils.scala
Outdated
Show resolved
Hide resolved
...est/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFieldIdSchemaSuite.scala
Show resolved
Hide resolved
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This looks pretty good to me. Just some cosmetic comments.
...c/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetSchemaConverter.scala
Show resolved
Hide resolved
sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala
Outdated
Show resolved
Hide resolved
...re/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFileFormat.scala
Outdated
Show resolved
Hide resolved
...e/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetReadSupport.scala
Show resolved
Hide resolved
...e/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetReadSupport.scala
Outdated
Show resolved
Hide resolved
...e/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetReadSupport.scala
Outdated
Show resolved
Hide resolved
...e/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetReadSupport.scala
Outdated
Show resolved
Hide resolved
.../src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetRowConverter.scala
Outdated
Show resolved
Hide resolved
def hasFieldId(field: StructField): Boolean = | ||
field.metadata.contains(FIELD_ID_METADATA_KEY) | ||
|
||
def getFieldId(field: StructField): Int = { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
maybe we can consider to combine getFieldId
and hasFieldId
into a single method:
def getFieldId(field: StructField): Option[Int]
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
IMHO, this is fine. I see that hasField() is used separately, and the assertion would still have be implemented somewhere anyway. As long as there is a test for this, it should be good.
def hasFieldId(field: StructField): Boolean = | ||
field.metadata.contains(FIELD_ID_METADATA_KEY) | ||
|
||
def getFieldId(field: StructField): Int = { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
IMHO, this is fine. I see that hasField() is used separately, and the assertion would still have be implemented somewhere anyway. As long as there is a test for this, it should be good.
...rc/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFieldIdIOSuite.scala
Outdated
Show resolved
Hide resolved
...est/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFieldIdSchemaSuite.scala
Show resolved
Hide resolved
...rc/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFieldIdIOSuite.scala
Show resolved
Hide resolved
SQLConf.SHUFFLE_PARTITIONS.key -> "5") | ||
SQLConf.SHUFFLE_PARTITIONS.key -> "5", | ||
// Enable parquet read field id for tests to ensure correctness | ||
SQLConf.PARQUET_FIELD_ID_READ_ENABLED.key -> "true" |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
But they would exist once we start writing field ids for all of the fields, would not they?
...c/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetSchemaConverter.scala
Show resolved
Hide resolved
sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetUtils.scala
Outdated
Show resolved
Hide resolved
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Looks good. I think you can remove WIP label from your PR as it is not longer work in progress.
Approved pending addressed comments.
thanks, merging to master! |
### What changes were proposed in this pull request? Minor follow ups on #35385: 1. Add a nested schema test 2. Fixed an error message. ### Why are the changes needed? Better observability. ### Does this PR introduce _any_ user-facing change? No. ### How was this patch tested? Existing test Closes #35700 from jackierwzhang/SPARK-38094-minor. Authored-by: jackierwzhang <[email protected]> Signed-off-by: Dongjoon Hyun <[email protected]>
What changes were proposed in this pull request?
Field Id is a native field in the Parquet schema (https://github.com/apache/parquet-format/blob/master/src/main/thrift/parquet.thrift#L398)
After this PR, when the requested schema has field IDs, Parquet readers will first use the field ID to determine which Parquet columns to read if the field ID exists in Spark schema, before falling back to match using column names.
This PR supports:
Why are the changes needed?
It enables matching columns by field id for supported DWs like iceberg and Delta. Specifically, it enables easy conversion from Iceberg (which uses field ids by name) to Delta, and allows
id
mode for Delta column mappingDoes this PR introduce any user-facing change?
This PR introduces three new configurations:
spark.sql.parquet.fieldId.write.enabled
: If enabled, Spark will write out native field ids that are stored inside StructField's metadata asparquet.field.id
to parquet files. This configuration is default totrue
.spark.sql.parquet.fieldId.read.enabled
: If enabled, Spark will attempt to read field ids in parquet files and utilize them for matching columns. This configuration is default tofalse
, so Spark could maintain its existing behavior by default.spark.sql.parquet.fieldId.read.ignoreMissing
: if enabled, Spark will read parquet files that do not have any field ids, while attempting to match the columns by id in Spark schema; nulls will be returned for spark columns without a match. This configuration is default tofalse
, so Spark could alert the user in case field id matching is expected but parquet files do not have any ids.How was this patch tested?
Existing tests + new unit tests.